[SPARK-38918][SQL] Nested column pruning should filter out attributes that do not belong to the current relation#36216
Conversation
|
cc @viirya fyi |
| * attributes in the schema that do not belong to the current relation. | ||
| */ | ||
| case class ProjectionOverSchema(schema: StructType) { | ||
| case class ProjectionOverSchema(schema: StructType, output: Option[AttributeSet] = None) { |
There was a problem hiding this comment.
We don't always need it? It is a None by default.
There was a problem hiding this comment.
Looks like AttributeSet is required for correctness. If we make it required, can we drop fieldNames var below and just check the attribute set?
| |where not exists (select null from employees e where e.name.first = c.name.first | ||
| | and e.employer.name = c.employer.company.name) | ||
| |""".stripMargin) | ||
| checkAnswer(query, Row(3)) |
There was a problem hiding this comment.
Should we check pruning schema too?
| } | ||
| } | ||
|
|
||
| testSchemaPruning("SPARK-38918: nested schema pruning with correlated subqueries") { |
There was a problem hiding this comment.
Without this PR, this test failed with java.lang.RuntimeException: Once strategy's idempotence is broken for batch RewriteSubquery.
There was a problem hiding this comment.
Yes it looks like a separate issue with column pruning and subquery rewrite (data source v1 only). I will investigate more.
viirya
left a comment
There was a problem hiding this comment.
This fix looks good. With a few comments.
| override protected val excludedOnceBatches: Set[String] = | ||
| Set( | ||
| "PartitionPruning", | ||
| "RewriteSubquery", |
There was a problem hiding this comment.
I discovered that this Once batch is not idempotent. ColumnPruning and CollapseProject can be applied multiple times after correlated IN/EXISTS subqueries are rewritten. Happy to discuss other ways to fix/improve this batch. cc @cloud-fan
Attached the plan change log for the test case:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery ===
Aggregate [count(1) AS count(1)#164L] Aggregate [count(1) AS count(1)#164L]
+- Project +- Project
! +- Filter NOT exists#157 [name#117 && employer#122 && (name#152.first = name#117.first) && (employer#153.name = employer#122.company.name)] +- Join LeftAnti, ((name#152.first = name#117.first) AND (employer#153.name = employer#122.company.name))
! : +- Project [null AS NULL#163, name#152, employer#153] :- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
! : +- Relation [id#151,name#152,employer#153] parquet +- Project [null AS NULL#163, name#152, employer#153]
! +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet +- Relation [id#151,name#152,employer#153] parquet
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
Aggregate [count(1) AS count(1)#164L] Aggregate [count(1) AS count(1)#164L]
+- Project +- Project
! +- Join LeftAnti, ((name#152.first = name#117.first) AND (employer#153.name = employer#122.company.name)) +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))
! :- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet :- Project [id#116, name#117.first AS _extract_first#167, address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS _extract_name#169, relations#123, p#124]
! +- Project [null AS NULL#163, name#152, employer#153] : +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
! +- Relation [id#151,name#152,employer#153] parquet +- Project [_extract_first#166, _extract_name#168]
! +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
! +- Project [name#152, employer#153]
! +- Relation [id#151,name#152,employer#153] parquet
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
Aggregate [count(1) AS count(1)#164L] Aggregate [count(1) AS count(1)#164L]
+- Project +- Project
+- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169)) +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))
:- Project [id#116, name#117.first AS _extract_first#167, address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS _extract_name#169, relations#123, p#124] :- Project [id#116, name#117.first AS _extract_first#167, address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS _extract_name#169, relations#123, p#124]
: +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet : +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
! +- Project [_extract_first#166, _extract_name#168] +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
! +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168] +- Relation [id#151,name#152,employer#153] parquet
! +- Project [name#152, employer#153]
! +- Relation [id#151,name#152,employer#153] parquet
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
Aggregate [count(1) AS count(1)#164L] Aggregate [count(1) AS count(1)#164L]
+- Project +- Project
+- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169)) +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))
! :- Project [id#116, name#117.first AS _extract_first#167, address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS _extract_name#169, relations#123, p#124] :- Project [_extract_first#167, _extract_name#169]
! : +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet : +- Project [name#117.first AS _extract_first#167, employer#122.company.name AS _extract_name#169]
! +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168] : +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
! +- Relation [id#151,name#152,employer#153] parquet +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
! +- Relation [id#151,name#152,employer#153] parquet
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
Aggregate [count(1) AS count(1)#164L] Aggregate [count(1) AS count(1)#164L]
+- Project +- Project
+- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169)) +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))
! :- Project [_extract_first#167, _extract_name#169] :- Project [name#117.first AS _extract_first#167, employer#122.company.name AS _extract_name#169]
! : +- Project [name#117.first AS _extract_first#167, employer#122.company.name AS _extract_name#169] : +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
! : +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
! +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168] +- Relation [id#151,name#152,employer#153] parquet
! +- Relation [id#151,name#152,employer#153] parquet
=== Result of Batch RewriteSubquery ===
Aggregate [count(1) AS count(1)#164L] Aggregate [count(1) AS count(1)#164L]
+- Project +- Project
! +- Filter NOT exists#157 [name#117 && employer#122 && (name#152.first = name#117.first) && (employer#153.name = employer#122.company.name)] +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))
! : +- Project [null AS NULL#163, name#152, employer#153] :- Project [name#117.first AS _extract_first#167, employer#122.company.name AS _extract_name#169]
! : +- Relation [id#151,name#152,employer#153] parquet : +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
! +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
!
There was a problem hiding this comment.
We don't find that before because we don't have the test coverage?
|
Hmm, there seems related test failure: |
viirya
left a comment
There was a problem hiding this comment.
Looks good. Just want to make sure the excludedOnceBatches change is not caused by this.
@viirya That's correct. It is not caused by this PR. The new test case happens to expose the idempotency issue that was not discovered before. |
|
@allisonwang-db I saw you only list |
|
@viirya Yes! This fix also needs to be in 3.0/3.1/3.2. |
|
Thanks. Merging to master/3.3. |
… that do not belong to the current relation ### What changes were proposed in this pull request? This PR updates `ProjectionOverSchema` to use the outputs of the data source relation to filter the attributes in the nested schema pruning. This is needed because the attributes in the schema do not necessarily belong to the current data source relation. For example, if a filter contains a correlated subquery, then the subquery's children can contain attributes from both the inner query and the outer query. Since the `RewriteSubquery` batch happens after early scan pushdown rules, nested schema pruning can wrongly use the inner query's attributes to prune the outer query data schema, thus causing wrong results and unexpected exceptions. ### Why are the changes needed? To fix a bug in `SchemaPruning`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #36216 from allisonwang-db/spark-38918-nested-column-pruning. Authored-by: allisonwang-db <allison.wang@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 150434b) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
|
@allisonwang-db There are conflicts in 3.2/3.1/3.0. Can you create separate PR(s) for them? |
… that do not belong to the current relation This PR updates `ProjectionOverSchema` to use the outputs of the data source relation to filter the attributes in the nested schema pruning. This is needed because the attributes in the schema do not necessarily belong to the current data source relation. For example, if a filter contains a correlated subquery, then the subquery's children can contain attributes from both the inner query and the outer query. Since the `RewriteSubquery` batch happens after early scan pushdown rules, nested schema pruning can wrongly use the inner query's attributes to prune the outer query data schema, thus causing wrong results and unexpected exceptions. To fix a bug in `SchemaPruning`. No Unit test Closes apache#36216 from allisonwang-db/spark-38918-nested-column-pruning. Authored-by: allisonwang-db <allison.wang@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 150434b) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 793ba60) Signed-off-by: allisonwang-db <allison.wang@databricks.com>
… that do not belong to the current relation This PR updates `ProjectionOverSchema` to use the outputs of the data source relation to filter the attributes in the nested schema pruning. This is needed because the attributes in the schema do not necessarily belong to the current data source relation. For example, if a filter contains a correlated subquery, then the subquery's children can contain attributes from both the inner query and the outer query. Since the `RewriteSubquery` batch happens after early scan pushdown rules, nested schema pruning can wrongly use the inner query's attributes to prune the outer query data schema, thus causing wrong results and unexpected exceptions. To fix a bug in `SchemaPruning`. No Unit test Closes apache#36216 from allisonwang-db/spark-38918-nested-column-pruning. Authored-by: allisonwang-db <allison.wang@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 150434b) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 793ba60) Signed-off-by: allisonwang-db <allison.wang@databricks.com>
…butes that do not belong to the current relation ### What changes were proposed in this pull request? Backport #36216 to branch-3.1. ### Why are the changes needed? To fix a bug in `SchemaPruning`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #36387 from allisonwang-db/spark-38918-branch-3.1. Authored-by: allisonwang-db <allison.wang@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…butes that do not belong to the current relation ### What changes were proposed in this pull request? Backport #36216 to branch-3.0. ### Why are the changes needed? To fix a bug in `SchemaPruning`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #36388 from allisonwang-db/spark-38918-branch-3.0. Authored-by: allisonwang-db <allison.wang@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
… that do not belong to the current relation This PR updates `ProjectionOverSchema` to use the outputs of the data source relation to filter the attributes in the nested schema pruning. This is needed because the attributes in the schema do not necessarily belong to the current data source relation. For example, if a filter contains a correlated subquery, then the subquery's children can contain attributes from both the inner query and the outer query. Since the `RewriteSubquery` batch happens after early scan pushdown rules, nested schema pruning can wrongly use the inner query's attributes to prune the outer query data schema, thus causing wrong results and unexpected exceptions. To fix a bug in `SchemaPruning`. No Unit test Closes apache#36216 from allisonwang-db/spark-38918-nested-column-pruning. Authored-by: allisonwang-db <allison.wang@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 150434b) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 793ba60) Signed-off-by: allisonwang-db <allison.wang@databricks.com>
…butes that do not belong to the current relation ### What changes were proposed in this pull request? Backport #36216 to branch-3.2 ### Why are the changes needed? To fix a bug in `SchemaPruning`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #36386 from allisonwang-db/spark-38918-branch-3.2. Authored-by: allisonwang-db <allison.wang@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…butes that do not belong to the current relation ### What changes were proposed in this pull request? Backport apache#36216 to branch-3.2 ### Why are the changes needed? To fix a bug in `SchemaPruning`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes apache#36386 from allisonwang-db/spark-38918-branch-3.2. Authored-by: allisonwang-db <allison.wang@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This PR updates
ProjectionOverSchemato use the outputs of the data source relation to filter the attributes in the nested schema pruning. This is needed because the attributes in the schema do not necessarily belong to the current data source relation. For example, if a filter contains a correlated subquery, then the subquery's children can contain attributes from both the inner query and the outer query. Since theRewriteSubquerybatch happens after early scan pushdown rules, nested schema pruning can wrongly use the inner query's attributes to prune the outer query data schema, thus causing wrong results and unexpected exceptions.Why are the changes needed?
To fix a bug in
SchemaPruning.Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test